package com.fwmagic.dynamic_rule.service.impl;

import com.fwmagic.dynamic_rule.bean.LogBean;
import com.fwmagic.dynamic_rule.bean.RuleParam;
import com.fwmagic.dynamic_rule.service.UserProfileQueryService;
import com.fwmagic.dynamic_rule.utils.SystemPrintUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * 用户画像查询服务，hbase查询实现类
 */
@Slf4j
public class UserProfileQueryServiceHbaseImpl implements UserProfileQueryService {

    private static Connection connection;

    private static Table table;

    static {
        log.debug("准备创建Hbase连接……");
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        conf.set("hbase.zookeeper.quorum", "hd1:2181,hd2:2181,hd3:2181");
        try {
            connection = ConnectionFactory.createConnection(conf);
            table = connection.getTable(TableName.valueOf("yinew_profile_new"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.debug("创建Hbase连接完成！");
    }

    /**
     * 传入一个用户号，以及要查询的条件
     * 返回这些条件是否满足
     * TODO 将在Hbase查询到的画像数据保存(缓存)起来，已备后续再次查询到
     *
     * @param deviceId
     * @param ruleParam
     * @return
     */
    @Override
    public boolean judgeProfileCondition(String deviceId, RuleParam ruleParam) {
        //构造查询条件
        Get get = new Get(deviceId.getBytes());

        //规则事件中的数据
        HashMap<String, String> userProfileParam = ruleParam.getUserProfileParam();
        Set<Map.Entry<String, String>> entries = userProfileParam.entrySet();

        //构造要查询的列
        for (Map.Entry<String, String> entry : entries) {
            get.addColumn("f".getBytes(), entry.getKey().getBytes());
        }

        try {
            //查询hbase
            long s = System.currentTimeMillis();
            Result result = table.get(get);
            long e = System.currentTimeMillis();
            for (Map.Entry<String, String> entry : entries) {
                String tagName = entry.getKey();
                String tagValue = entry.getValue();
                //查询画像中指定列的值和事件中的值是否相等
                byte[] bytesValue = result.getValue("f".getBytes(), tagName.getBytes());
                if (!(bytesValue != null && new String(bytesValue).equals(entry.getValue()))) {
                    log.debug("规则:{},用户:{},查询了Hbase,不匹配。要求的条件是:{}:{},查询结果为:{},查询耗时:{} ms", ruleParam.getRuleId(), deviceId, tagName, tagValue, new String(bytesValue), (e - s));
                    return false;
                }
            }
            log.debug("规则:{},用户:{},查询了Hbase,匹配成功。查询耗时:{} ms", ruleParam.getRuleId(), deviceId, (e - s));
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
}
